{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Futures in IPython Parallel\n",
    "\n",
    "New in IPython Parallel 5.0 is the fact that our AsyncResult object is now a Future\n",
    "(specifically a subclass of concurrent.futures.Future).\n",
    "\n",
    "This means it can be integrated into any Future-using application."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<DirectView [0, 1, 2, 3]>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import ipyparallel as ipp\n",
    "rc = ipp.Client()\n",
    "dv = rc[:]\n",
    "dv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Do some imports everywhere"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%px --local\n",
    "import os\n",
    "import time\n",
    "import numpy\n",
    "from numpy.linalg import norm"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def random_norm(n):\n",
    "    \"\"\"Generates a 1xN array and computes its 2-norm\"\"\"\n",
    "    A = numpy.random.random(n)\n",
    "    return norm(A, 2)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The basic async API hasn't changed:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<AsyncResult: random_norm>"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "f = rc[-1].apply(random_norm, 100)\n",
    "f"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5.9875490723743265"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "f.get()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "But the familiar AsyncResult object is now a Future:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[ipyparallel.client.asyncresult.AsyncResult,\n",
       " concurrent.futures._base.Future,\n",
       " object]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "f.__class__.mro()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This means that we can use Future APIs to access results, etc."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5.9875490723743265"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "f.result()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "I got PID: 7892\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "7892"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import os\n",
    "f = rc[-1].apply(os.getpid)\n",
    "f.add_done_callback(lambda _: print(\"I got PID: %i\" % _.result()))\n",
    "f.get()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "A more complex example shows us how AsyncResults can be integrated into existing async applications, now that they are Futures:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ". [7885, 7886, 7891, 7892]\n",
      ". . [7885, 7886, 7891, 7892]\n",
      ". . . [7885, 7886, 7891, 7892]\n",
      ". . . . [7885, 7886, 7891, 7892]\n"
     ]
    }
   ],
   "source": [
    "from tornado.gen import coroutine, sleep\n",
    "from tornado.ioloop import IOLoop\n",
    "import sys\n",
    "\n",
    "def sleep_task(t):\n",
    "    time.sleep(t)\n",
    "    return os.getpid()\n",
    "\n",
    "@coroutine\n",
    "def background():\n",
    "    \"\"\"A backgorund coroutine to demonstrate that we aren't blocking\"\"\"\n",
    "    while True:\n",
    "        yield sleep(1)\n",
    "        print('.', end=' ')\n",
    "        sys.stdout.flush() # not needed after ipykernel 4.3\n",
    "\n",
    "@coroutine\n",
    "def work():\n",
    "    \"\"\"Submit some work and print the results when complete\"\"\"\n",
    "    for t in [ 1, 2, 3, 4 ]:\n",
    "        ar =  rc[:].apply(sleep_task, t)\n",
    "        result = yield ar # this waits\n",
    "        print(result)\n",
    "    \n",
    "loop = IOLoop()\n",
    "loop.add_callback(background)\n",
    "loop.run_sync(work)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So if you have an existing async application using coroutined and/or Futures,\n",
    "you can now integrate IPython Parallel as a standard async component for submitting work and waiting for its results."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Executors\n",
    "\n",
    "Executors are a standard Python API provided by various job-submission tools.\n",
    "A standard API such as Executor is useful for different libraries to expose this common API for asynchronous execution,\n",
    "because it means different implementations can be easily swapped out for each other and compared,\n",
    "or the best one for a given context can be used without having to change the code.\n",
    "\n",
    "With IPython Parallel, every View has an `.executor` property, to provide the Executor API for the given View.\n",
    "Just like Views, the assignment of work for an Executor depends on the View from which it was created.\n",
    "\n",
    "You can get an Executor for any View by accessing `View.executor`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 2, 3]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ex_all = rc[:].executor\n",
    "ex_all.view.targets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "7891\n",
      "7885\n",
      "7891\n",
      "7891\n",
      "7891\n",
      "7891\n",
      "7885\n",
      "7885\n",
      "7885\n",
      "7885\n"
     ]
    }
   ],
   "source": [
    "even_lbview = rc.load_balanced_view(targets=rc.ids[::2])\n",
    "ex_even = even_lbview.executor\n",
    "for pid in ex_even.map(lambda x: os.getpid(), range(10)):\n",
    "    print(pid)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Typically, though, one will want an Executor for a LoadBalancedView on all the engines. In which case, use the top-level Client.executor method:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<LoadBalancedView None>"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ex = rc.executor()\n",
    "ex.view"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Comparing Executors\n",
    "\n",
    "Let's make a few Executors:\n",
    "\n",
    "The distributed executor assumes you have started a distributed cluster on the default local interface, e.g.\n",
    "\n",
    "    $> dcluster 127.0.0.1 127.0.0.1 127.0.0.1 127.0.0.1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor\n",
    "import distributed\n",
    "\n",
    "class DistributedExecutor(distributed.Executor):\n",
    "    \"\"\"Wrap distributed.Executor to provide standard Executor.map API\n",
    "    \n",
    "    distributed.Executor.map returns list of Futures,\n",
    "    not iterable of results, like everything else.\n",
    "    \n",
    "    See blaze/distributed#91\n",
    "    \"\"\"\n",
    "    def map(self, *args, **kwargs):\n",
    "        list_of_futures = super().map(*args, **kwargs)\n",
    "        for f in list_of_futures:\n",
    "            yield f.result()\n",
    "\n",
    "N = 4\n",
    "ip_ex = rc.executor(targets=range(N))\n",
    "dist_ex = DistributedExecutor('127.0.0.1:8787')\n",
    "thread_ex = ThreadPoolExecutor(N)\n",
    "process_ex = ProcessPoolExecutor(N)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "executors = [process_ex, thread_ex, ip_ex, dist_ex]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can submit the same work with the same API,\n",
    "using four different mechanisms for distributing work.\n",
    "The results will be the same:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ProcessPoolExecutor\n",
      "['0', '1', '2', '3', '4']\n",
      "ThreadPoolExecutor\n",
      "['0', '1', '2', '3', '4']\n",
      "ViewExecutor\n",
      "['0', '1', '2', '3', '4']\n",
      "DistributedExecutor\n",
      "['0', '1', '2', '3', '4']\n"
     ]
    }
   ],
   "source": [
    "for executor in executors:\n",
    "    print(executor.__class__.__name__)\n",
    "    it = executor.map(str, range(5))\n",
    "    print(list(it))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This makes it easy to compare the different implementations. We are going to submit some dummy work—allocate and compute 2-norms of arrays of various sizes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "def task(n):\n",
    "    \"\"\"Generates a 1xN array and computes its 2-norm\"\"\"\n",
    "    import numpy\n",
    "    from numpy.linalg import norm\n",
    "    A = numpy.ones(n)\n",
    "    return norm(A, 2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([ 1048576,  1261463,  1517571,  1825676,  2196334,  2642245,\n",
       "        3178688,  3824041,  4600417,  5534417,  6658042,  8009791,\n",
       "        9635980, 11592325, 13945857, 16777216])"
      ]
     },
     "execution_count": 24,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sizes = np.logspace(20, 24, 16, base=2, dtype=int)\n",
    "sizes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the work locally, to get a reference:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Local time:\n",
      "CPU times: user 161 ms, sys: 404 ms, total: 565 ms\n",
      "Wall time: 560 ms\n"
     ]
    }
   ],
   "source": [
    "print(\"Local time:\")\n",
    "%time ref = list(map(task, sizes))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "And then run again with the various Executors:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ProcessPoolExecutor\n",
      "10 loops, best of 3: 126 ms per loop\n",
      "ThreadPoolExecutor\n",
      "10 loops, best of 3: 149 ms per loop\n",
      "ViewExecutor\n",
      "10 loops, best of 3: 151 ms per loop\n",
      "DistributedExecutor\n",
      "10 loops, best of 3: 141 ms per loop\n"
     ]
    }
   ],
   "source": [
    "for executor in executors:\n",
    "    print(executor.__class__.__name__)\n",
    "    result = executor.map(task, sizes)\n",
    "    rlist = list(result)\n",
    "    assert rlist == ref, \"%s != %s\" % (rlist, ref)\n",
    "    # time the task assignment\n",
    "    %timeit list(executor.map(task, sizes))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For this toy work, the stdlib ProcessPoolExecutor appears to perform the best (though in testing, it seems to crash quite a bit). That's useful info.\n",
    "One benefit of IPython Parallel or Distributed Executors over the stdlib Executors is that they do not have to be confined to a single machine.\n",
    "This means the standard Executor API lets you develop small-scale parallel tools that run locally in threads or processes,\n",
    "and then extend the *exact same code* to make use of multiple machines,\n",
    "just by selecting a different Executor.\n",
    "\n",
    "That seems pretty useful."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
